﻿#if NET40
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SourceCore.cs
//
//
// The core implementation of a standard ISourceBlock<TOutput>.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Linq;
using CuteAnt.AsyncEx;

namespace System.Threading.Tasks.Dataflow.Internal
{
  // LOCK-LEVELING SCHEME
  // --------------------
  // SourceCore employs two locks: OutgoingLock and ValueLock.  Additionally, targets we call out to
  // likely utilize their own IncomingLock.  We can hold OutgoingLock while acquiring ValueLock or IncomingLock.
  // However, we cannot hold ValueLock while calling out to external code or while acquiring OutgoingLock, and
  // we cannot hold IncomingLock when acquiring OutgoingLock. Additionally, the locks employed must be reentrant.

  /// <summary>Provides a core implementation for blocks that implement <see cref="ISourceBlock{TOutput}"/>.</summary>
  /// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="SourceCore{TOutput}"/>.</typeparam>
  [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
  [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
  internal sealed class SourceCore<TOutput>
  {
    #region -- These fields are readonly and are initialized to new instances at construction. --

    /// <summary>A TaskCompletionSource that represents the completion of this block.</summary>
    private readonly TaskCompletionSource<VoidResult> _completionTask = new TaskCompletionSource<VoidResult>();

    /// <summary>A registry used to store all linked targets and information about them.</summary>
    private readonly TargetRegistry<TOutput> _targetRegistry;

    /// <summary>The output messages queued up to be received by consumers/targets.</summary>
    /// <remarks>
    /// The queue is only ever accessed by a single producer and single consumer at a time.  On the producer side,
    /// we require that AddMessage/AddMessages are the only places the queue is added to, and we require that those
    /// methods not be used concurrently with anything else.  All of our target halves today follow that restriction;
    /// for example, TransformBlock with DOP==1 will have at most a single task processing the user provided delegate,
    /// and thus at most one task calling AddMessage.  If it has a DOP > 1, it'll go through the ReorderingBuffer,
    /// which will use a lock to synchronize the output of all of the processing tasks such that only one is using
    /// AddMessage at a time.  On the consumer side of SourceCore, all consumption is protected by ValueLock, and thus
    /// all consumption is serialized.
    /// </remarks>
    private readonly SingleProducerSingleConsumerQueue<TOutput> _messages = new SingleProducerSingleConsumerQueue<TOutput>(); // protected by AddMessage/ValueLock

    /// <summary>Gets the object to use as the outgoing lock.</summary>
    private Object OutgoingLock { get { return _completionTask; } }

    /// <summary>Gets the object to use as the value lock.</summary>
    private Object ValueLock { get { return _targetRegistry; } }

    #endregion

    #region -- These fields are readonly and are initialized by arguments to the constructor. --

    /// <summary>The source utilizing this helper.</summary>
    private readonly ISourceBlock<TOutput> _owningSource;

    /// <summary>The options used to configure this block's execution.</summary>
    private readonly DataflowBlockOptions _dataflowBlockOptions;

    /// <summary>An action to be invoked on the owner block to stop accepting messages.
    /// This action is invoked when SourceCore encounters an exception.</summary>
    private readonly Action<ISourceBlock<TOutput>> _completeAction;

    /// <summary>An action to be invoked on the owner block when an item is removed.
    /// This may be null if the owner block doesn't need to be notified.</summary>
    private readonly Action<ISourceBlock<TOutput>, Int32> _itemsRemovedAction;

    /// <summary>Item counting function</summary>
    private readonly Func<ISourceBlock<TOutput>, TOutput, IList<TOutput>, Int32> _itemCountingFunc;

    #endregion

    #region -- These fields are mutated during execution. --

    /// <summary>The task used to process the output and offer it to targets.</summary>
    private Task _taskForOutputProcessing; // protected by ValueLock

    /// <summary>Counter for message IDs unique within this source block.</summary>
    private PaddedInt64 _nextMessageId = new PaddedInt64 { Value = 1 }; // We are going to use this value before incrementing.  Protected by ValueLock.

    /// <summary>The target that the next message is reserved for, or null if nothing is reserved.</summary>
    private ITargetBlock<TOutput> _nextMessageReservedFor; // protected by OutgoingLock

    /// <summary>Whether all future messages should be declined.</summary>
    private Boolean _decliningPermanently; // Protected by ValueLock

    /// <summary>Whether this block should again attempt to offer messages to targets.</summary>
    private Boolean _enableOffering = true; // Protected by ValueLock, sometimes read with volatile reads

    /// <summary>Whether someone has reserved the right to call CompleteBlockOncePossible.</summary>
    private Boolean _completionReserved; // Protected by OutgoingLock

    /// <summary>Exceptions that may have occurred and gone unhandled during processing.</summary>
    private List<Exception> _exceptions; // Protected by ValueLock, sometimes read with volatile reads

    #endregion

    #region @@ Properties @@

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
    internal Task Completion { get { return _completionTask.Task; } }

    /// <summary>Gets the number of items available to be received from this block.</summary>
    internal Int32 OutputCount { get { lock (OutgoingLock) lock (ValueLock) return _messages.Count; } }

    /// <summary>Gets whether the _exceptions list is non-null.</summary>
    internal Boolean HasExceptions
    {
      get
      {
        // We may check whether _exceptions is null without taking a lock because it is volatile
        return Volatile.Read(ref _exceptions) != null;
      }
    }

    /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
    internal DataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } }

    /// <summary>Gets whether the source has had cancellation requested or an exception has occurred.</summary>
    private Boolean CanceledOrFaulted
    {
      get
      {
        // Cancellation is honored as soon as the CancellationToken has been signaled.
        // Faulting is honored after an exception has been encountered and the owning block
        // has invoked Complete on us.
        return _dataflowBlockOptions.CancellationToken.IsCancellationRequested ||
              (HasExceptions && _decliningPermanently);
      }
    }

    /// <summary>Gets the object to display in the debugger display attribute.</summary>
    [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
    private Object DebuggerDisplayContent
    {
      get
      {
        var displaySource = _owningSource as IDebuggerDisplay;
        return "Block=\"{0}\"".FormatWith(displaySource != null ? displaySource.Content : _owningSource);
      }
    }

    #endregion

    #region @@ Constructors @@

    /// <summary>Initializes the source core.</summary>
    /// <param name="owningSource">The source utilizing this core.</param>
    /// <param name="dataflowBlockOptions">The options to use to configure the block.</param>
    /// <param name="completeAction">Action to invoke in order to decline the associated target half, which will in turn decline this source core.</param>
    /// <param name="itemsRemovedAction">Action to invoke when one or more items is removed.  This may be null.</param>
    /// <param name="itemCountingFunc">
    /// Action to invoke when the owner needs to be able to count the number of individual
    /// items in an output or set of outputs.
    /// </param>
    internal SourceCore(
        ISourceBlock<TOutput> owningSource, DataflowBlockOptions dataflowBlockOptions,
        Action<ISourceBlock<TOutput>> completeAction,
        Action<ISourceBlock<TOutput>, Int32> itemsRemovedAction = null,
        Func<ISourceBlock<TOutput>, TOutput, IList<TOutput>, Int32> itemCountingFunc = null)
    {
      Debug.Assert(owningSource != null, "Core must be associated with a source.");
      Debug.Assert(dataflowBlockOptions != null, "Options must be provided to configure the core.");
      Debug.Assert(completeAction != null, "Action to invoke on completion is required.");

      // Store the args
      _owningSource = owningSource;
      _dataflowBlockOptions = dataflowBlockOptions;
      _itemsRemovedAction = itemsRemovedAction;
      _itemCountingFunc = itemCountingFunc;
      _completeAction = completeAction;

      // Construct members that depend on the args
      _targetRegistry = new TargetRegistry<TOutput>(_owningSource);
    }

    #endregion

    #region == LinkTo ==

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
    [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
    internal IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
    {
      // Validate arguments
      if (target == null) { throw new ArgumentNullException(nameof(target)); }
      if (linkOptions == null) { throw new ArgumentNullException(nameof(linkOptions)); }
      Contract.EndContractBlock();

      // If the block is already completed, there is not much to do -
      // we have to propagate completion if that was requested, and
      // then bail without taking the lock.
      if (_completionTask.Task.IsCompleted)
      {
        if (linkOptions.PropagateCompletion)
        {
          Common.PropagateCompletion(_completionTask.Task, target, exceptionHandler: null);
        }
        return Disposables.Nop;
      }

      lock (OutgoingLock)
      {
        // If completion has been reserved, the target registry has either been cleared already
        // or is about to be cleared. So we can link and offer only if completion is not reserved.
        if (!_completionReserved)
        {
          _targetRegistry.Add(ref target, linkOptions);
          OfferToTargets(linkToTarget: target);
          return Common.CreateUnlinker(OutgoingLock, _targetRegistry, target);
        }
      }

      // The block should not offer any messages when it is in this state, but
      // it should still propagate completion if that has been requested.
      if (linkOptions.PropagateCompletion)
      {
        Common.PropagateCompletionOnceCompleted(_completionTask.Task, target);
      }
      return Disposables.Nop;
    }

    #endregion

    #region == ConsumeMessage ==

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
    internal TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
    {
      // Validate arguments
      if (!messageHeader.IsValid) { throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader)); }
      if (target == null) { throw new ArgumentNullException(nameof(target)); }
      Contract.EndContractBlock();

      TOutput consumedMessageValue = default(TOutput);

      lock (OutgoingLock)
      {
        // If this target doesn't hold the reservation, then for this ConsumeMessage
        // to be valid, there must not be any reservation (since otherwise we can't
        // consume a message destined for someone else).
        if (_nextMessageReservedFor != target &&
            _nextMessageReservedFor != null)
        {
          messageConsumed = false;
          return default(TOutput);
        }

        lock (ValueLock)
        {
          // If the requested message isn't the next message to be served up, bail.
          // Otherwise, we're good to go: dequeue the message as it will now be owned by the target,
          // signal that we can resume enabling offering as there's potentially a new "next message",
          // complete if necessary, and offer asynchronously all messages as is appropriate.

          if (messageHeader.Id != _nextMessageId.Value ||
              !_messages.TryDequeue(out consumedMessageValue))
          {
            messageConsumed = false;
            return default(TOutput);
          }

          _nextMessageReservedFor = null;
          _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
          _enableOffering = true; // reenable offering if it was disabled
          _nextMessageId.Value++;
          CompleteBlockIfPossible();
          OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
        }
      }

      // Notify the owner block that our count has decreased
      if (_itemsRemovedAction != null)
      {
        var count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, consumedMessageValue, null) : 1;
        _itemsRemovedAction(_owningSource, count);
      }

      // Return the consumed message value
      messageConsumed = true;
      return consumedMessageValue;
    }

    #endregion

    #region == ReserveMessage ==

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
    internal Boolean ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
    {
      // Validate arguments
      if (!messageHeader.IsValid) { throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader)); }
      if (target == null) { throw new ArgumentNullException(nameof(target)); }
      Contract.EndContractBlock();

      lock (OutgoingLock)
      {
        // If no one currently holds a reservation...
        if (_nextMessageReservedFor == null)
        {
          lock (ValueLock)
          {
            // ...and if the requested message is next in the queue, allow it
            if (messageHeader.Id == _nextMessageId.Value && !_messages.IsEmpty)
            {
              _nextMessageReservedFor = target;
              _enableOffering = false;
              return true;
            }
          }
        }
      }
      return false;
    }

    #endregion

    #region == ReleaseReservation ==

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
    internal void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
    {
      // Validate arguments
      if (!messageHeader.IsValid) { throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader)); }
      if (target == null) { throw new ArgumentNullException(nameof(target)); }
      Contract.EndContractBlock();

      lock (OutgoingLock)
      {
        // If someone else holds the reservation, bail.
        if (_nextMessageReservedFor != target) { throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget); }

        lock (ValueLock)
        {
          // If this is not the message at the head of the queue, bail
          if (messageHeader.Id != _nextMessageId.Value || _messages.IsEmpty)
          {
            throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget);
          }

          // Otherwise, release the reservation
          _nextMessageReservedFor = null;
          Debug.Assert(!_enableOffering, "Offering should have been disabled if there was a valid reservation");
          _enableOffering = true;

          // Now there is at least one message ready for offering. So offer it.
          // If a cancellation is pending, this method will bail out.
          OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);

          // This reservation may be holding the block's completion. So try to complete.
          CompleteBlockIfPossible();
        }
      }
    }

    #endregion

    #region == TryReceive ==

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
    internal Boolean TryReceive(Predicate<TOutput> filter, out TOutput item)
    {
      item = default(TOutput);
      var itemReceived = false;

      lock (OutgoingLock)
      {
        // If the next message is reserved for someone, we can't receive right now.  Otherwise...
        if (_nextMessageReservedFor == null)
        {
          lock (ValueLock)
          {
            // If there's at least one message, and there's no filter or the next item
            // passes the filter, dequeue it to be returned.
            if (_messages.TryDequeueIf(filter, out item))
            {
              _nextMessageId.Value++;

              // Now that the next message has changed, reenable offering if it was disabled
              _enableOffering = true;

              // If removing this item was the last thing this block will ever do, complete it,
              CompleteBlockIfPossible();

              // Now, try to offer up messages asynchronously, since we've
              // changed what's at the head of the queue
              OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);

              itemReceived = true;
            }
          }
        }
      }

      if (itemReceived)
      {
        // Notify the owner block that our count has decreased
        if (_itemsRemovedAction != null)
        {
          var count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, item, null) : 1;
          _itemsRemovedAction(_owningSource, count);
        }
      }
      return itemReceived;
    }

    #endregion

    #region == TryReceiveAll ==

    /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
    internal Boolean TryReceiveAll(out IList<TOutput> items)
    {
      items = null;
      var countReceived = 0;

      lock (OutgoingLock)
      {
        // If the next message is reserved for someone, we can't receive right now.  Otherwise...
        if (_nextMessageReservedFor == null)
        {
          lock (ValueLock)
          {
            if (!_messages.IsEmpty)
            {
              // Receive all of the data, clearing it out in the process.
              var tmpList = new List<TOutput>();
              TOutput item;
              while (_messages.TryDequeue(out item)) { tmpList.Add(item); }
              countReceived = tmpList.Count;
              items = tmpList;

              // Increment the next ID. Any new value is good.
              _nextMessageId.Value++;

              // Now that the next message has changed, reenable offering if it was disabled
              _enableOffering = true;

              // Now that the block is empty, check to see whether we should complete.
              CompleteBlockIfPossible();
            }
          }
        }
      }

      if (countReceived > 0)
      {
        // Notify the owner block that our count has decreased
        if (_itemsRemovedAction != null)
        {
          var count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, default(TOutput), items) : countReceived;
          _itemsRemovedAction(_owningSource, count);
        }
        return true;
      }
      else return false;
    }

    #endregion

    #region == AddMessage ==

    /// <summary>Adds a message to the source block for propagation.
    /// This method must only be used by one thread at a time, and must not be used concurrently
    /// with any other producer side methods, e.g. AddMessages, Complete.</summary>
    /// <param name="item">The item to be wrapped in a message to be added.</param>
    internal void AddMessage(TOutput item)
    {
      // This method must not take the OutgoingLock, as it will likely be called in situations
      // where an IncomingLock is held.

      if (_decliningPermanently) { return; }
      _messages.Enqueue(item);

#if !NET40
      Interlocked.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
#else
      Thread.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
#endif

      if (_taskForOutputProcessing == null)
      {
        // Separated out to enable inlining of AddMessage
        OfferAsyncIfNecessaryWithValueLock();
      }
    }

    /// <summary>Adds messages to the source block for propagation.
    /// This method must only be used by one thread at a time, and must not be used concurrently
    /// with any other producer side methods, e.g. AddMessage, Complete.</summary>
    /// <param name="items">The list of items to be wrapped in messages to be added.</param>
    internal void AddMessages(IEnumerable<TOutput> items)
    {
      Debug.Assert(items != null, "Items list must be valid.");

      // This method must not take the OutgoingLock, as it will likely be called in situations
      // where an IncomingLock is held.

      if (_decliningPermanently) { return; }

      // Special case arrays and lists, for which we can avoid the
      // enumerator allocation that'll result from using a foreach.
      // This also avoids virtual method calls that we'd get if we
      // didn't special case.
      var itemsAsList = items as List<TOutput>;
      if (itemsAsList != null)
      {
        for (Int32 i = 0; i < itemsAsList.Count; i++)
        {
          _messages.Enqueue(itemsAsList[i]);
        }
      }
      else
      {
        TOutput[] itemsAsArray = items as TOutput[];
        if (itemsAsArray != null)
        {
          for (Int32 i = 0; i < itemsAsArray.Length; i++)
          {
            _messages.Enqueue(itemsAsArray[i]);
          }
        }
        else
        {
          foreach (TOutput item in items)
          {
            _messages.Enqueue(item);
          }
        }
      }

#if !NET40
      Interlocked.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
#else
      Thread.MemoryBarrier(); // ensure the read of _taskForOutputProcessing doesn't move up before the writes in Enqueue
#endif

      if (_taskForOutputProcessing == null)
      {
        OfferAsyncIfNecessaryWithValueLock();
      }
    }

    #endregion

    #region == AddException ==

    /// <summary>Adds an individual exception to this source.</summary>
    /// <param name="exception">The exception to add</param>
    internal void AddException(Exception exception)
    {
      Debug.Assert(exception != null, "Valid exception must be provided to be added.");
      Debug.Assert(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
      lock (ValueLock)
      {
        Common.AddException(ref _exceptions, exception);
      }
    }

    /// <summary>Adds exceptions to this source.</summary>
    /// <param name="exceptions">The exceptions to add</param>
    internal void AddExceptions(List<Exception> exceptions)
    {
      Debug.Assert(exceptions != null, "Valid exceptions must be provided to be added.");
      Debug.Assert(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
      lock (ValueLock)
      {
        foreach (Exception exception in exceptions)
        {
          Common.AddException(ref _exceptions, exception);
        }
      }
    }

    #endregion

    #region == AddAndUnwrapAggregateException ==

    /// <summary>Adds the exceptions contained in an AggregateException to this source.</summary>
    /// <param name="aggregateException">The exception to add</param>
    internal void AddAndUnwrapAggregateException(AggregateException aggregateException)
    {
      Debug.Assert(aggregateException != null && aggregateException.InnerExceptions.Count > 0, "Aggregate must be valid and contain inner exceptions to unwrap.");
      Debug.Assert(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions.");
      lock (ValueLock)
      {
        Common.AddException(ref _exceptions, aggregateException, unwrapInnerExceptions: true);
      }
    }

    #endregion

    #region == Complete ==

    /// <summary>Informs the block that it will not be receiving additional messages.</summary>
    internal void Complete()
    {
      lock (ValueLock)
      {
        _decliningPermanently = true;

        // CompleteAdding may be called in a context where an incoming lock is held.  We need to
        // call CompleteBlockIfPossible, but we can't do so if the incoming lock is held.
        // However, we know that _decliningPermanently has been set, and thus the timing of
        // CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
        // and take the necessary locks in a situation where we're sure it won't cause a problem.
        Task.Factory.StartNew(state =>
        {
          var thisSourceCore = (SourceCore<TOutput>)state;
          lock (thisSourceCore.OutgoingLock)
          {
            lock (thisSourceCore.ValueLock)
            {
              thisSourceCore.CompleteBlockIfPossible();
            }
          }
        }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
      }
    }

    #endregion

    #region ** OfferToTargets **

    /// <summary>Offers messages to all targets.</summary>
    /// <param name="linkToTarget">
    /// The newly linked target, if OfferToTargets is being called to synchronously
    /// propagate to a target during a LinkTo operation.
    /// </param>
    private Boolean OfferToTargets(ITargetBlock<TOutput> linkToTarget = null)
    {
      Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
      Common.ContractAssertMonitorStatus(ValueLock, held: false);

      // If the next message is reserved, we can't offer anything
      if (_nextMessageReservedFor != null) { return false; }

      // Peek at the next message if there is one, so we can offer it.
      var header = default(DataflowMessageHeader);
      TOutput message = default(TOutput);
      var offerJustToLinkToTarget = false;

      // If offering isn't enabled and if we're not doing this as
      // a result of LinkTo, bail. Otherwise, with offering disabled, we must have
      // already offered this message to all existing targets, so we can just offer
      // it to the newly linked target.
      if (!Volatile.Read(ref _enableOffering))
      {
        if (linkToTarget == null) { return false; }
        else { offerJustToLinkToTarget = true; }
      }

      // Otherwise, peek at message to offer
      if (_messages.TryPeek(out message))
      {
        header = new DataflowMessageHeader(_nextMessageId.Value);
      }

      // If there is a message, offer it.
      var messageWasAccepted = false;
      if (header.IsValid)
      {
        if (offerJustToLinkToTarget)
        {
          // If we've already offered the message to everyone else,
          // we can just offer it to the newly linked target
          Debug.Assert(linkToTarget != null, "Must have a valid target to offer to.");
          OfferMessageToTarget(header, message, linkToTarget, out messageWasAccepted);
        }
        else
        {
          // Otherwise, we've not yet offered this message to anyone, so even
          // if linkToTarget is non-null, we need to propagate the message in order
          // through all of the registered targets, the last of which will be the linkToTarget
          // if it's non-null (no need to special-case it, though).

          // Note that during OfferMessageToTarget, a target may call ConsumeMessage (taking advantage of the
          // reentrancy of OutgoingLock), which may unlink the target if the target is registered as "unlinkAfterOne".
          // Doing so will remove the target from the targets list. As such, we maintain the next node
          // separately from cur.Next, in case cur.Next changes by cur being removed from the list.
          // No other node in the list should change, as we're protected by OutgoingLock.

          var cur = _targetRegistry.FirstTargetNode;
          while (cur != null)
          {
            var next = cur.Next;
            if (OfferMessageToTarget(header, message, cur.Target, out messageWasAccepted)) { break; }
            cur = next;
          }

          // If none of the targets accepted the message, disable offering.
          if (!messageWasAccepted)
          {
            lock (ValueLock)
            {
              _enableOffering = false;
            }
          }
        }
      }

      // If a message got accepted, consume it and reenable offering.
      if (messageWasAccepted)
      {
        lock (ValueLock)
        {
          // SourceCore set consumeToAccept to false.  However, it's possible
          // that an incorrectly written target may ignore that parameter and synchronously consume
          // even though they weren't supposed to.  To recover from that,
          // we'll only dequeue if the correct message is still at the head of the queue.
          // However, we'll assert so that we can at least catch this in our own debug builds.
          TOutput dropped;
          if (_nextMessageId.Value != header.Id ||
              !_messages.TryDequeue(out dropped)) // remove the next message
          {
            Debug.Assert(false, "The target did not follow the protocol.");
          }
          _nextMessageId.Value++;

          // The message was accepted, so there's now going to be a new next message.
          // If offering had been disabled, reenable it.
          _enableOffering = true;

          // Now that a message has been removed, we need to complete if possible or
          // or asynchronously offer if necessary.  However, if we're calling this as part of our
          // offering loop, we won't be able to do either, since by definition there's already
          // a processing task spun up (us) that would prevent these things.  So we only
          // do the checks if we're being called to link a new target rather than as part
          // of normal processing.
          if (linkToTarget != null)
          {
            CompleteBlockIfPossible();
            OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: true);
          }
        }

        // Notify the owner block that our count has decreased
        if (_itemsRemovedAction != null)
        {
          Int32 count = _itemCountingFunc != null ? _itemCountingFunc(_owningSource, message, null) : 1;
          _itemsRemovedAction(_owningSource, count);
        }
      }

      return messageWasAccepted;
    }

    #endregion

    #region ** OfferMessageToTarget **

    /// <summary>Offers the message to the target.</summary>
    /// <param name="header">The header of the message to offer.</param>
    /// <param name="message">The message being offered.</param>
    /// <param name="target">The single target to which the message should be offered.</param>
    /// <param name="messageWasAccepted">true if the message was accepted by the target; otherwise, false.</param>
    /// <returns>
    /// true if the message should not be offered to additional targets;
    /// false if propagation should be allowed to continue.
    /// </returns>
    private Boolean OfferMessageToTarget(
        DataflowMessageHeader header, TOutput message, ITargetBlock<TOutput> target,
        out Boolean messageWasAccepted)
    {
      Debug.Assert(target != null, "Valid target to offer to is required.");
      Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
      Common.ContractAssertMonitorStatus(ValueLock, held: false);

      DataflowMessageStatus result = target.OfferMessage(header, message, _owningSource, consumeToAccept: false);
      Debug.Assert(result != DataflowMessageStatus.NotAvailable, "Messages are not being offered concurrently, so nothing should be missed.");
      messageWasAccepted = false;

      // If accepted, note it, and if the target was linked as "once", remove it
      if (result == DataflowMessageStatus.Accepted)
      {
        _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true);
        messageWasAccepted = true;
        return true; // the message should not be offered to anyone else
      }
      // If declined permanently, remove the target
      else if (result == DataflowMessageStatus.DecliningPermanently)
      {
        _targetRegistry.Remove(target);
      }
      // If the message was reserved by the target, stop propagating
      else if (_nextMessageReservedFor != null)
      {
        Debug.Assert(result == DataflowMessageStatus.Postponed,
            "If the message was reserved, it should also have been postponed.");
        return true; // the message should not be offered to anyone else
      }
      // If the result was Declined, there's nothing more to be done.
      // This message will sit at the front of the queue until someone claims it.

      return false; // allow the message to be offered to someone else
    }

    #endregion

    #region ** OfferAsyncIfNecessaryWithValueLock **

    /// <summary>Called when we want to enable asynchronously offering message to targets.
    /// Takes the ValueLock before delegating to OfferAsyncIfNecessary.</summary>
    private void OfferAsyncIfNecessaryWithValueLock()
    {
      lock (ValueLock)
      {
        OfferAsyncIfNecessary(isReplacementReplica: false, outgoingLockKnownAcquired: false);
      }
    }

    #endregion

    #region ** OfferAsyncIfNecessary **

    /// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
    /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
    /// <param name="outgoingLockKnownAcquired">Whether the caller is sure that the outgoing lock is currently held by this thread.</param>
    private void OfferAsyncIfNecessary(Boolean isReplacementReplica, Boolean outgoingLockKnownAcquired)
    {
      Common.ContractAssertMonitorStatus(ValueLock, held: true);

      // Fast path to enable OfferAsyncIfNecessary to be inlined.  We only need
      // to proceed if there's no task processing, offering is enabled, and
      // there are no messages to be processed.
      if (_taskForOutputProcessing == null && _enableOffering && !_messages.IsEmpty)
      {
        // Slow path: do additional checks and potentially launch new task
        OfferAsyncIfNecessary_Slow(isReplacementReplica, outgoingLockKnownAcquired);
      }
    }

    #endregion

    #region ** OfferAsyncIfNecessary_Slow **

    /// <summary>Called when we want to enable asynchronously offering message to targets.</summary>
    /// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
    /// <param name="outgoingLockKnownAcquired">Whether the caller is sure that the outgoing lock is currently held by this thread.</param>
    private void OfferAsyncIfNecessary_Slow(Boolean isReplacementReplica, Boolean outgoingLockKnownAcquired)
    {
      Common.ContractAssertMonitorStatus(ValueLock, held: true);
      Debug.Assert(_taskForOutputProcessing == null && _enableOffering && !_messages.IsEmpty,
          "The block must be enabled for offering, not currently be processing, and have messages available to process.");

      // This method must not take the outgoing lock, as it will likely be called in situations
      // where a derived type's incoming lock is held.

      Boolean targetsAvailable = true;
      if (outgoingLockKnownAcquired || TaskShim.IsEntered(OutgoingLock))
      {
        Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
        targetsAvailable = _targetRegistry.FirstTargetNode != null;
      }

      // If there's any work to be done...
      if (targetsAvailable && !CanceledOrFaulted)
      {
        // Create task and store into _taskForOutputProcessing prior to scheduling the task
        // so that _taskForOutputProcessing will be visibly set in the task loop.
        _taskForOutputProcessing = new Task(thisSourceCore => ((SourceCore<TOutput>)thisSourceCore).OfferMessagesLoopCore(), this,
                                             Common.GetCreationOptionsForTask(isReplacementReplica));

#if FEATURE_TRACING
        var etwLog = DataflowEtwProvider.Log;
        if (etwLog.IsEnabled())
        {
          etwLog.TaskLaunchedForMessageHandling(
              _owningSource, _taskForOutputProcessing, DataflowEtwProvider.TaskLaunchedReason.OfferingOutputMessages, _messages.Count);
        }
#endif

        // Start the task handling scheduling exceptions
#pragma warning disable 0420
        var exception = Common.StartTaskSafe(_taskForOutputProcessing, _dataflowBlockOptions.TaskScheduler);
#pragma warning restore 0420
        if (exception != null)
        {
          // First, log the exception while the processing state is dirty which is preventing the block from completing.
          // Then revert the proactive processing state changes.
          // And last, try to complete the block.
          AddException(exception);
          _taskForOutputProcessing = null;
          _decliningPermanently = true;

          // Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
          // Re-take the locks on a separate thread.
          Task.Factory.StartNew(state =>
          {
            var thisSourceCore = (SourceCore<TOutput>)state;
            lock (thisSourceCore.OutgoingLock)
            {
              lock (thisSourceCore.ValueLock)
              {
                thisSourceCore.CompleteBlockIfPossible();
              }
            }
          }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
        }
        if (exception != null) { AddException(exception); }
      }
    }

    #endregion

    #region ** OfferMessagesLoopCore **

    /// <summary>Task body used to process messages.</summary>
    [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
    private void OfferMessagesLoopCore()
    {
      Debug.Assert(_taskForOutputProcessing != null && _taskForOutputProcessing.Id == Task.CurrentId,
          "Must be part of the current processing task.");
      try
      {
        Int32 maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;

        // We need to hold the outgoing lock while offering messages.  We can either
        // lock and unlock for each individual offering, or we can lock around multiple or all
        // possible offerings.  The former ensures that other operations don't get starved,
        // while the latter is much more efficient (not continually acquiring and releasing
        // the lock).  For blocks that aren't linked to any targets, this won't matter
        // (no offering is done), and for blocks that are only linked to targets, this shouldn't
        // matter (no one is contending for the lock), thus
        // the only case it would matter is when a block both has targets and is being
        // explicitly received from, which is an uncommon scenario.  Thus, we want to lock
        // around the whole thing to improve performance, but just in case we do hit
        // an uncommon scenario, in the default case we release the lock every now and again.
        // If a developer wants to control this, they can limit the duration of the
        // lock by using MaxMessagesPerTask.

        const Int32 DEFAULT_RELEASE_LOCK_ITERATIONS = 10; // Dialable
        var releaseLockIterations =
            _dataflowBlockOptions.MaxMessagesPerTask == DataflowBlockOptions.Unbounded ?
                DEFAULT_RELEASE_LOCK_ITERATIONS : maxMessagesPerTask;

        for (Int32 messageCounter = 0; messageCounter < maxMessagesPerTask && !CanceledOrFaulted;)
        {
          lock (OutgoingLock)
          {
            // While there are more messages to process, offer each in turn
            // to the targets.  If we're unable to propagate a particular message,
            // stop trying until something changes in the future.
            for (Int32 lockReleaseCounter = 0;
                messageCounter < maxMessagesPerTask && lockReleaseCounter < releaseLockIterations && !CanceledOrFaulted;
                ++messageCounter, ++lockReleaseCounter)
            {
              if (!OfferToTargets()) { return; }
            }
          }
        }
      }
      catch (Exception exc)
      {
        // Record the exception
        AddException(exc);

        // Notify the owning block it should stop accepting new messages
        _completeAction(_owningSource);
      }
      finally
      {
        lock (OutgoingLock)
        {
          lock (ValueLock)
          {
            // We're no longer processing, so null out the processing task
            Debug.Assert(_taskForOutputProcessing != null && _taskForOutputProcessing.Id == Task.CurrentId,
                "Must be part of the current processing task.");
            _taskForOutputProcessing = null;
#if !NET40
            Interlocked.MemoryBarrier(); // synchronize with AddMessage(s) and its read of _taskForOutputProcessing
#else
            Thread.MemoryBarrier(); // synchronize with AddMessage(s) and its read of _taskForOutputProcessing
#endif

            // However, we may have given up early because we hit our own configured
            // processing limits rather than because we ran out of work to do.  If that's
            // the case, make sure we spin up another task to keep going.
            OfferAsyncIfNecessary(isReplacementReplica: true, outgoingLockKnownAcquired: true);

            // If, however, we stopped because we ran out of work to do and we
            // know we'll never get more, then complete.
            CompleteBlockIfPossible();
          }
        }
      }
    }

    #endregion

    #region ** CompleteBlockIfPossible **

    /// <summary>Completes the block's processing if there's nothing left to do and never will be.</summary>
    private void CompleteBlockIfPossible()
    {
      Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
      Common.ContractAssertMonitorStatus(ValueLock, held: true);

      if (!_completionReserved)
      {
        if (_decliningPermanently && // declining permanently, so no more messages will arrive
            _taskForOutputProcessing == null && // no current processing
            _nextMessageReservedFor == null) // no pending reservation
        {
          CompleteBlockIfPossible_Slow();
        }
      }
    }

    #endregion

    #region ** CompleteBlockIfPossible_Slow **

    /// <summary>Slow path for CompleteBlockIfPossible.
    /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.</summary>
    private void CompleteBlockIfPossible_Slow()
    {
      Debug.Assert(
          _decliningPermanently && _taskForOutputProcessing == null && _nextMessageReservedFor == null,
          "The block must be declining permanently, there must be no reservations, and there must be no processing tasks");
      Common.ContractAssertMonitorStatus(OutgoingLock, held: true);
      Common.ContractAssertMonitorStatus(ValueLock, held: true);

      if (_messages.IsEmpty || CanceledOrFaulted)
      {
        _completionReserved = true;

        // Get out from under currently held locks.  This is to avoid
        // invoking synchronous continuations off of _completionTask.Task
        // while holding a lock.
        Task.Factory.StartNew(state => ((SourceCore<TOutput>)state).CompleteBlockOncePossible(),
            this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
      }
    }

    #endregion

    #region ** CompleteBlockOncePossible **

    /// <summary>Completes the block.  This must only be called once, and only once all of the completion conditions are met.
    /// As such, it must only be called from CompleteBlockIfPossible.</summary>
    private void CompleteBlockOncePossible()
    {
      TargetRegistry<TOutput>.LinkedTargetInfo linkedTargets;
      List<Exception> exceptions;

      // Avoid completing while the code that caused this completion to occur is still holding a lock.
      // Clear out the target registry and buffers to help avoid memory leaks.
      lock (OutgoingLock)
      {
        // Save the linked list of targets so that it could be traversed later to propagate completion
        linkedTargets = _targetRegistry.ClearEntryPoints();
        lock (ValueLock)
        {
          _messages.Clear();

          // Save a local reference to the exceptions list and null out the field,
          // so that if the target side tries to add an exception this late,
          // it will go to a separate list (that will be ignored.)
          exceptions = _exceptions;
          _exceptions = null;
        }
      }

      // If it's due to an unhandled exception, finish in an error state
      if (exceptions != null)
      {
        _completionTask.TrySetException(exceptions);
      }
      // If it's due to cancellation, finish in a canceled state
      else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
      {
        _completionTask.TrySetCanceled();
      }
      // Otherwise, finish in a successful state.
      else
      {
        _completionTask.TrySetResult(default(VoidResult));
      }

      // Now that the completion task is completed, we may propagate completion to the linked targets
      _targetRegistry.PropagateCompletion(linkedTargets);
#if FEATURE_TRACING
      DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
      if (etwLog.IsEnabled())
      {
        etwLog.DataflowBlockCompleted(_owningSource);
      }
#endif
    }

    #endregion

    #region == GetDebuggingInformation ==

    /// <summary>Gets information about this helper to be used for display in a debugger.</summary>
    /// <returns>Debugging information about this source core.</returns>
    internal DebuggingInformation GetDebuggingInformation()
    {
      return new DebuggingInformation(this);
    }

    #endregion

    #region == class DebuggingInformation ==

    /// <summary>Provides debugging information about the source core.</summary>
    internal sealed class DebuggingInformation
    {
      /// <summary>The source being viewed.</summary>
      private SourceCore<TOutput> _source;

      /// <summary>Initializes the type proxy.</summary>
      /// <param name="source">The source being viewed.</param>
      internal DebuggingInformation(SourceCore<TOutput> source)
      {
        _source = source;
      }

      /// <summary>Gets the number of messages available for receiving.</summary>
      internal Int32 OutputCount { get { return _source._messages.Count; } }

      /// <summary>Gets the messages available for receiving.</summary>
      internal IEnumerable<TOutput> OutputQueue { get { return _source._messages.ToList(); } }

      /// <summary>Gets the task being used for output processing.</summary>
      internal Task TaskForOutputProcessing { get { return _source._taskForOutputProcessing; } }

      /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
      internal DataflowBlockOptions DataflowBlockOptions { get { return _source._dataflowBlockOptions; } }

      /// <summary>Gets whether the block is completed.</summary>
      internal Boolean IsCompleted { get { return _source.Completion.IsCompleted; } }

      /// <summary>Gets the set of all targets linked from this block.</summary>
      internal TargetRegistry<TOutput> LinkedTargets { get { return _source._targetRegistry; } }

      /// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
      internal ITargetBlock<TOutput> NextMessageReservedFor { get { return _source._nextMessageReservedFor; } }
    }

    #endregion
  }
}
#endif